feat(spring-jakarta): [Queue Instrumentation 4] Add Kafka consumer instrumentation#5255
feat(spring-jakarta): [Queue Instrumentation 4] Add Kafka consumer instrumentation#5255adinauer wants to merge 6 commits intofeat/queue-instrumentation-producerfrom
Conversation
Add SentryKafkaRecordInterceptor that creates queue.process transactions for incoming Kafka records. Forks scopes per record, extracts sentry-trace and baggage headers for distributed tracing via continueTrace, and calculates messaging.message.receive.latency from the enqueued-time header. Composes with existing RecordInterceptor via delegation. Span lifecycle is managed through success/failure callbacks. Add SentryKafkaConsumerBeanPostProcessor to register the interceptor on ConcurrentKafkaListenerContainerFactory beans. Co-Authored-By: Claude <noreply@anthropic.com>
Semver Impact of This PR🟡 Minor (new features) 📋 Changelog PreviewThis is how your changes will appear in the changelog. New Features ✨Spring Jakarta
Other
Internal Changes 🔧Deps
Other
🤖 This preview updates automatically when you update the PR. |
|
Sentry Build Distribution
|
Performance metrics 🚀
|
271fb8b to
1f00027
Compare
…strumentation-consumer
…rning log Update SentryKafkaRecordInterceptor and its test to reference SentryProducerInterceptor instead of the removed SentryKafkaProducerWrapper. Add a warning log in SentryKafkaConsumerBeanPostProcessor when reflection fails to read the existing RecordInterceptor, so users know their custom interceptor may not be chained. Co-Authored-By: Claude <noreply@anthropic.com>
…strumentation-consumer
… ordering Add initForTest/close to SentryKafkaRecordInterceptorTest to fix NPE from TransactionContext constructor requiring initialized Sentry. Regenerate API file to fix alphabetical ordering of SentryProducerInterceptor entry. Co-Authored-By: Claude <noreply@anthropic.com>
|
I investigated whether we should switch the consumer composition to Spring’s I don’t think we should for this case, because it is strictly weaker than our current wrapper behavior:
Given the ThreadLocal/lifecycle-token leak risk we just fixed, that |
| } | ||
|
|
||
| @SuppressWarnings("rawtypes") | ||
| final RecordInterceptor sentryInterceptor = |
There was a problem hiding this comment.
Any particular reason, we are going the composite route in the producer but adding the existing interceptor as a delegate here?
There was a problem hiding this comment.
There is a CompositeRecordInterceptor
PR Stack (Queue Instrumentation)
📜 Description
Add Kafka consumer instrumentation to
sentry-spring-jakarta:SentryKafkaRecordInterceptor— implementsRecordInterceptorto createqueue.processtransactions for incoming Kafka records. Forks scopes per record for isolation (ISentryLifecycleTokenlifecycle). Extractssentry-traceandbaggageheaders for distributed tracing viacontinueTrace, and calculatesmessaging.message.receive.latencyfrom the enqueued-time header. Composes with existingRecordInterceptorvia delegation. Span lifecycle is managed through success/failure callbacks.SentryKafkaConsumerBeanPostProcessor— registers the interceptor onConcurrentKafkaListenerContainerFactorybeans.💡 Motivation and Context
Consumer-side instrumentation is needed to create process transactions and continue the distributed trace from the producer.
💚 How did you test it?
Unit tests for
SentryKafkaRecordInterceptorandSentryKafkaConsumerBeanPostProcessor.📝 Checklist
sendDefaultPIIis enabled.🔮 Next steps